/**
 * Copyright 2005 The Apache Software Foundation
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.ipc;

import java.io.IOException;
import java.io.EOFException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.StringWriter;
import java.io.PrintWriter;

import java.net.Socket;
import java.net.ServerSocket;
import java.net.SocketException;
import java.net.SocketTimeoutException;

import java.util.LinkedList;
import java.util.logging.Logger;
import java.util.logging.Level;

import org.apache.hadoop.util.LogFormatter;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.UTF8;

/**
 * An abstract IPC service.  IPC calls take a single {@link Writable} as a
 * parameter, and return a {@link Writable} as their value.  A service runs on
 * a port and is defined by a parameter class and a value class.
 *
 * @author Doug Cutting
 * @see Client
 */
public abstract class Server {
    public static final Logger LOG =
            LogFormatter.getLogger("org.apache.hadoop.ipc.Server");

    private static final ThreadLocal SERVER = new ThreadLocal();

    /**
     * Returns the server instance called under or null.  May be called under
     * {@link #call(Writable)} implementations, and under {@link Writable}
     * methods of paramters and return values.  Permits applications to access
     * the server context.
     */
    public static Server get() {
        return (Server) SERVER.get();
    }

    // port we listen on
    private final int port;
    // number of handler threads
    private final int handlerCount;
    // max number of queued calls
    private final int maxQueuedCalls;
    /**
     * @author mac
     * @date 2021/9/28
     * 描述：Invocation.class
     */
    // class of call parameters
    private final Class paramClass;
    private final Configuration conf;

    private int timeout;

    // true while server runs
    private boolean running = true;
    // queued calls
    private final LinkedList callQueue = new LinkedList();
    // used by wait/notify
    private final Object callDequeued = new Object();

    /**
     * A call queued for handling.
     */
    private static class Call {
        // the client's call id
        private final int id;
        // the parameter passed
        private final Writable param;
        // connection to client
        private final Connection connection;

        public Call(int id, Writable param, Connection connection) {
            this.id = id;
            this.param = param;
            this.connection = connection;
        }
    }

    /** Listens on the socket, starting new connection threads. */
    /**
     * @author mac
     * @date 2021/9/28
     * 描述：1、监听客户端连接
     * 2、客户端的rpc调用放入队列
     */
    private class Listener extends Thread {
        private final ServerSocket socket;

        public Listener() throws IOException {
            this.socket = new ServerSocket(port);
            socket.setSoTimeout(timeout);
            this.setDaemon(true);
            this.setName("Server listener on port " + port);
        }

        @Override
        public void run() {
            LOG.info(getName() + ": starting");
            while (running) {
                try {
                    new Connection(socket.accept()).start(); // start a new connection
                } catch (SocketTimeoutException e) {      // ignore timeouts
                } catch (Exception e) {                   // log all other exceptions
                    LOG.log(Level.INFO, getName() + " caught: " + e, e);
                }
            }
            try {
                socket.close();
            } catch (IOException e) {
                LOG.info(getName() + ": e=" + e);
            }
            LOG.info(getName() + ": exiting");
        }
    }

    /**
     * Reads calls from a connection and queues them for handling.
     */
    private class Connection extends Thread {
        private final Socket socket;
        private final DataInputStream in;
        private final DataOutputStream out;

        public Connection(Socket socket) throws IOException {
            this.socket = socket;
            socket.setSoTimeout(timeout);
            this.in = new DataInputStream
                    (new BufferedInputStream(socket.getInputStream()));
            this.out = new DataOutputStream
                    (new BufferedOutputStream(socket.getOutputStream()));
            this.setDaemon(true);
            this.setName("Server connection on port " + port + " from "
                    + socket.getInetAddress().getHostAddress());
        }

        @Override
        public void run() {
            LOG.info(getName() + ": starting");
            SERVER.set(Server.this);
            try {
                while (running) {
                    int id;
                    try {
                        id = in.readInt();                    // try to read an id
                    } catch (SocketTimeoutException e) {
                        continue;
                    }

                    if (LOG.isLoggable(Level.FINE)) {
                        LOG.fine(getName() + " got #" + id);
                    }

                    Writable param = makeParam();           // read param
                    param.readFields(in);
                    System.out.println("接受到消息:" + param.hashCode());

                    Call call = new Call(id, param, this);

                    synchronized (callQueue) {
                        callQueue.addLast(call);              // queue the call
                        callQueue.notify();                   // wake up a waiting handler
                    }

                    while (running && callQueue.size() >= maxQueuedCalls) {
                        synchronized (callDequeued) {         // queue is full
                            callDequeued.wait(timeout);         // wait for a dequeue
                        }
                    }
                }
            } catch (EOFException eof) {
                // This is what happens on linux when the other side shuts down
            } catch (SocketException eof) {
                // This is what happens on Win32 when the other side shuts down
            } catch (Exception e) {
                LOG.log(Level.INFO, getName() + " caught: " + e, e);
            } finally {
                try {
                    socket.close();
                } catch (IOException ignored) {
                }
                LOG.info(getName() + ": exiting");
            }
        }

    }

    /** Handles queued calls . */
    /**
     * @author mac
     * @date 2021/9/28
     * 描述：处理rpc调用
     */
    private class Handler extends Thread {
        public Handler(int instanceNumber) {
            this.setDaemon(true);
            this.setName("Server handler " + instanceNumber + " on " + port);
        }

        @Override
        public void run() {
            LOG.info(getName() + ": starting");
            SERVER.set(Server.this);
            while (running) {
                try {
                    Call call;
                    synchronized (callQueue) {
                        while (running && callQueue.size() == 0) { // wait for a call
                            callQueue.wait(timeout);
                        }
                        /**
                         * @author mac
                         * @date 2021/9/28
                         * 描述：等太久，可能running为false
                         */
                        if (!running) {
                            break;
                        }

                        /**
                         * @author mac
                         * @date 2021/9/28
                         * 描述：取出队列最前面的rpc调用
                         */
                        call = (Call) callQueue.removeFirst(); // pop the queue
                    }

                    /**
                     * @author mac
                     * @date 2021/9/28
                     * 描述：1、上面执行完，表示rpc队列不是满的
                     *      2、通知rpc队列可以接收其他的调用
                     */
                    synchronized (callDequeued) {           // tell others we've dequeued
                        callDequeued.notify();
                    }

                    if (LOG.isLoggable(Level.FINE)) {
                        LOG.fine(getName() + ": has #" + call.id + " from " +
                                call.connection.socket.getInetAddress().getHostAddress());
                    }

                    String error = null;
                    Writable value = null;
                    try {
                        /**
                         * @author mac
                         * @date 2021/9/28
                         * 描述：处理rpc调用
                         */
                        // make the call
                        value = call(call.param);
                    } catch (Exception e) {
                        LOG.log(Level.INFO, getName() + " call error: " + e, e);
                        error = getStackTrace(e);
                    }

                    DataOutputStream out = call.connection.out;
                    synchronized (out) {
                        // write call id
                        out.writeInt(call.id);
                        // write error flag
                        out.writeBoolean(error != null);
                        if (error != null) {
                            value = new UTF8(error);
                        }
                        // write value
                        value.write(out);
                        out.flush();
                    }

                } catch (Exception e) {
                    LOG.log(Level.INFO, getName() + " caught: " + e, e);
                }
            }
            LOG.info(getName() + ": exiting");
        }

        private String getStackTrace(Throwable throwable) {
            StringWriter stringWriter = new StringWriter();
            PrintWriter printWriter = new PrintWriter(stringWriter);
            throwable.printStackTrace(printWriter);
            printWriter.flush();
            return stringWriter.toString();
        }

    }

    /**
     * Constructs a server listening on the named port.  Parameters passed must
     * be of the named class.  The <code>handlerCount</handlerCount> determines
     * the number of handler threads that will be used to process calls.
     */
    protected Server(int port, int handlerCount, Configuration conf) {
        this.conf = conf;
        this.port = port;
        this.paramClass = RPC.Invocation.class;
        this.handlerCount = handlerCount;
        this.maxQueuedCalls = handlerCount;
        this.timeout = conf.getInt("ipc.client.timeout", 10000);
    }

    /**
     * Sets the timeout used for network i/o.
     */
    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    /**
     * Starts the service.  Must be called before any calls will be handled.
     */
    public synchronized void start() throws IOException {
        Listener listener = new Listener();
        listener.start();

        for (int i = 0; i < handlerCount; i++) {
            Handler handler = new Handler(i);
            handler.start();
        }
    }

    /**
     * Stops the service.  No new calls will be handled after this is called.  All
     * subthreads will likely be finished after this returns.
     */
    public synchronized void stop() {
        LOG.info("Stopping server on " + port);
        running = false;
        try {
            //  inexactly wait for pending requests to finish
            Thread.sleep(timeout);
        } catch (InterruptedException ignored) {
        }
        notifyAll();
    }

    /**
     * Wait for the server to be stopped.
     * Does not wait for all subthreads to finish.
     * See {@link #stop()}.
     */
    public synchronized void join() throws InterruptedException {
        while (running) {
            wait();
        }
    }

    /**
     * Called for each call.
     */
    public abstract Writable call(Writable param) throws IOException;


    private Writable makeParam() {
        Writable param;                               // construct param
        try {
            param = (Writable) paramClass.newInstance();
            if (param instanceof Configurable) {
                ((Configurable) param).setConf(conf);
            }
        } catch (InstantiationException | IllegalAccessException e) {
            throw new RuntimeException(e.toString());
        }
        return param;
    }

}
